[[tombstones]]
= Null Payloads and Log Compaction of 'Tombstone' Records

When you use https://kafka.apache.org/documentation/#compaction[Log Compaction], you can send and receive messages with `null` payloads to identify the deletion of a key.

You can also receive `null` values for other reasons, such as a `Deserializer` that might return `null` when it cannot deserialize a value.

To send a `null` payload by using the `KafkaTemplate`, you can pass null into the value argument of the `send()` methods.
One exception to this is the `send(Message<?> message)` variant.
Since `spring-messaging` `Message<?>` cannot have a `null` payload, you can use a special payload type called `KafkaNull`, and the framework sends `null`.
For convenience, the static `KafkaNull.INSTANCE` is provided.

When you use a message listener container, the received `ConsumerRecord` has a `null` `value()`.

To configure the `@KafkaListener` to handle `null` payloads, you must use the `@Payload` annotation with `required = false`.
If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was +++"+++`deleted`+++"+++.
The following example shows such a configuration:

[source, java]
----
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}
----

When you use a class-level `@KafkaListener` with multiple `@KafkaHandler` methods, some additional configuration is needed.
Specifically, you need a `@KafkaHandler` method with a `KafkaNull` payload.
The following example shows how to configure one:

[source, java]
----
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}
----

Note that the argument is `null`, not `KafkaNull`.

TIP: See xref:tips.adoc[Manually Assigning All Partitions].

IMPORTANT: This feature requires the use of a `KafkaNullAwarePayloadArgumentResolver` which the framework will configure when using the default `MessageHandlerMethodFactory`.
When using a custom `MessageHandlerMethodFactory`, see xref:kafka/serdes.adoc#custom-arg-resolve[Adding custom `HandlerMethodArgumentResolver` to `@KafkaListener`].

